---
slug: pollux-primer-part-2
title: "A Pollux Primer, Part 2"
authors: [oerling, pedroerp]
tags: [tech-blog, primer]
---

In this article, we will discuss how a distributed compute engine executes a
query similar to the one presented in
[our first article](https://pollux-lib.io/blog/pollux-primer-part-1):

```Shell
SELECT l_partkey, count(*) FROM lineitem GROUP BY l_partkey;
```

We use the TPC-H schema to illustrate the example, and
[Prestissimo](https://github.com/prestodb/presto/tree/master/presto-native-execution)
as the compute engine orchestrating distributed query execution. Prestissimo is
responsible for the query engine frontend (parsing, resolving metadata,
planning, optimizing) and distributed execution (allocating resources and
shipping query fragments), and Pollux is responsible for the execution of plan
fragments within a single worker node. Throughout this article, we will present
which functions are performed by Pollux and which by the distributed engine -
Prestissimo, in this example.

## Query Setup

Prestissimo first receives the query through a *coordinator* node, which is
responsible for parsing and planning the query. For our sample query, a
distributed query plan with three query fragments will be created:

1. The first fragment reads the *l_partkey* column from the lineitem table and
divides its output according to a hash of *l_partkey*.
2. The second fragment reads the output from the first fragment and updates a
hash table from *l_partkey* containing the number of times the particular value
of *l_partkey* has been seen (the count(*) aggregate function implementation). 
3. The final fragment then reads the content of the hash tables, once the
second fragment has received all the rows from the first fragment. 

<figure>
    <img src="/img/pollux-primer-part2-1.png" height= "70%" width="70%"/>
</figure>

The shuffle between the two first fragments partitions the data according to
*l_partkey*. Suppose there are 100 instances of the second fragment. If the hash
of *l_partkey* modulo 100 is 0, the row goes to the first task in the second
stage; if it is 1, the row goes to the second task, and so forth. In this way,
each second stage Task gets a distinct subset of the rows. The shuffle between
the second and third stage is a *gather*, meaning that there is one Task in the
third stage that will read the output of all 100 tasks in the second stage.

A *Stage* is the set of Tasks that share the same plan fragment. A Task is the
main integration point between Prestissimo and Pollux; it’s the Pollux execution
instance that physically processes all or part of the data that passes through
the stage.

To set up the distributed execution, Prestissimo first selects the workers from
the pool of Prestissimo server processes it manages. Assuming that stage 1 is
10 workers wide, it selects 10 server processes and sends the first stage plan
to them. It then selects 100 workers for stage 2 and sends the second stage
plan to these. The last stage that gathers the result has only one worker, so
Prestissimo sends the final plan to only one worker. The set of workers for
each stage may overlap, so a single worker process may host multiple stages of
one query.

Let's now look more closely at what each worker does at query setup time.

## Task Setup

In Prestissimo, the message that sets up a Task in a worker is called *Task
Update*. A Task Update has the following information: the plan, configuration
settings, and an optional list of splits. Splits are further qualified by what
plan node they are intended for, and whether more splits for the recipient plan
node and split group will be coming.  

Since split generation involves enumerating files from storage (so they may
take a while), Presto allows splits to be sent to workers asynchronously, such
that the generation of splits can run in parallel with the execution of the
first splits. Therefore, the first task update specifies the plan and the
config. Subsequent ones only add more splits.

Besides the plan, the coordinator provides configs as maps from string key to
string value, both top level and connector level. The connector configs have
settings for each connector; connectors are used by table scan and table writer
to deal with storage and file formats. These configs and other information,
like thread pools, top level memory pool etc. are handed to the Task in a
QueryCtx object. See `pollux/core/QueryCtx.h` in the Pollux repo for details.

## From Plans to Drivers and Operators

Once the Pollux Task is created, a TaskManager hands it splits to work on. This
is done with `Task::addSplit()`, and can be done after the Task has started
executing. See `pollux/exec/Task.h` for details.

Let's zoom into what happens at Task creation: A PlanNode tree specifying what
the Task does is given to the Task as part of a *PlanFragment*. The most
important step done at Task creation is splitting the plan tree into pipelines.
Each pipeline then gets a DriverFactory, which is the factory class that makes
Drivers for the pipeline. The Drivers, in their turn, contain the Operators
that do the work of running the query. The DriverFactories are made in
LocalPlanner.cpp. See `LocalPlanner::plan` for details.

<figure>
    <img src="/img/pollux-primer-part2-2.png" height= "70%" width="70%"/>
</figure>

Following the execution model known as Volcano, the plan is represented by an
operator tree where each node consumes the output of its child operators, and
returns output to the parent operator. The root node is typically a
PartitionedOutputNode or a TableWriteNode. The leaf nodes are either
TableScanNode, ExchangeNode or ValuesNode (used for query literals). The full
set of Pollux PlanNode can be found at pollux/core/PlanNode.h.

The PlanNodes mostly correspond to Operators. PlanNodes are not executable as
such; they are only a structure describing how to make Drivers and Operators,
which do the actual execution. If the tree of nodes has a single branch, then
the plan is a single pipeline. If it has nodes with more than one child
(input), then the second input of the node becomes a separate pipeline.  

`Task::start()` creates the DriverFactories, which then create the Drivers. To
start execution, the Drivers are queued on a thread pool executor. The main
function that runs Operators is `Driver::runInternal()`. See this function for
the details of how Operators and the Driver interface: `Operator::isBlocked()`
determines if the Driver can advance. If it cannot, it goes off thread until a
future is realized, which then puts it back on the executor.

getOutput() retrieves data from an Operator and addInputs() feeds data into
another Operator. The order of execution is to advance the last Operator which
can produce output and then feed this to the next Operator. if an Operator
cannot produce output, then getOutput() is called on the Operator before it
until one is found that can produce data. If no operator is blocked and no
Operator can produce output, then the plan is at end. The noMoreInput() method
is called on each operator. This can unblock production of results, for
example, an OrderBy can only produce its output after it knows that it has all
the input.

## The Minimal Pipeline: Table Scan and Repartitioning

**Table Scan.** With the table scan stage of our sample query, we have one pipeline
with two operators: TableScan and PartitionedOutput. Assume this pipeline has
five Drivers, and that all these five Drivers go on the thread pool executor.
The PartitionedOutput cannot do anything because it has no input. The
TableScan::getOutput() is then called. See `pollux/exec/TableScan.cpp` for
details. The first action TableScan takes is to look for a Split with
task::getSplitOrFuture(). If there is no split available, this returns a
future. The Driver will then park itself off thread and install a callback on
the future that will reschedule the Driver when a split is available.

It could also be the case that there is no split and the Task has been notified
that no more splits will be coming. In this case TableScan would be at the end.
Finally, if a Split is available, TableScan interprets it. Given a TableHandle
specification provided as part of the plan (list of columns and filters), the
Connector (as specified in the Split) makes a DataSource. The DataSource
handles the details of IO and file and table formats. 

The DataSource is then given the split. After this, DataSource::next() can be
called repeatedly to get vectors (batches) of output from the file/section of
file specified by the Split. If the DataSource is at the end, TableScan looks
for the next split. See Connector.h for the Connector and DataSource
interfaces.

**Repartitioning.** Now we have traced execution up to the TableScan returning its
first batch of output. The Driver feeds this to PartitionedOutput::addInput().
See PartitionedOutput.cpp for details. PartitionedOutput first calculates a
hash on the partitioning key, in this case, *l_partkey*, producing a destination
number for each row in the batch RowVectorPtr input_.

Each destination has a partly filled serialization buffer (VectorStreamGroup)
for each destination worker. If there are 100 Tasks in the second stage, each
PartitionedOutput has 100 destinations, each with one VectorStreamGroup. The
main function of a VectorStreamGroup is append(), which takes a RowVectorPtr
and a set of row numbers in it. It serializes each value identified by the row
numbers and adds it to the partly formed serialization. When enough rows are
accumulated in the VectorStreamGroup, it produces a SerializedPage. See flush()
in PartitionedOutput.cpp.

The SerializedPage is a self contained serialized packet of information that
can be transmitted over the wire to the next stage. Each such page only
contains rows intended for the same recipient. These pages are then queued up
in the worker process' OutputBufferManager. Note the code with BlockingReason
in flush(). The buffer manager maintains separate queues of all consumers of
all Tasks. If a queue is full, adding output may block. This returns a future
that is realized when there is space to add more data to the queue. This
depends on when the Task's consumer Task fetches the data.

Shuffles in Prestissimo are implemented by PartitionedOutput at the producer
and Exchange at the consumer end. The OutputBufferManager keeps ready
serialized data for consumers to pick up. The binding of these to the Presto
wire protocols is in TaskManager.cpp for the producer side, and in
PrestoExchangeSource.cpp for the consumer side.

## Recap

We presented how a plan becomes executable and how data moves between and
inside Operators. We discussed that a Driver can block (go off thread) to wait
for a Split to become available, or to wait for its output to be consumed. We
have now scratched the surface of running a leaf stage of a distributed query.
There is much more to Operators and vectors, though. In the next installment of
Pollux Primer, we will look at what the second stage of our minimal sample query
does.
